모형 최적화 분산 처리

ipyparallel

  • Engine <-> Client
    • Engine: 실제 계산이 실행되는 프로세스
    • Client: 엔진을 제어하기 위한 인터페이스
$ conda install ipyparallel

Engine 가동/중지

  • 가동

    $ ipcluster start -n 4
  • 중지

    • Control-C

Client


In [4]:
from ipyparallel import Client
c = Client()
c.ids


Out[4]:
[0, 1]

In [5]:
dview = c[:]
dview


Out[5]:
<DirectView [0, 1]>

Map / Reduce

  • map(function, data): data 각각에 function을 실행하여 결과 출력
  • reduce(function, data): function을 실행할 때 마다 결과의 수가 감소. 최종적으로 하나의 수가 남는다.

In [6]:
def fahrenheit(T):
    return 9 / 5 * T + 32

temp = np.arange(0, 110, 10)
temp


Out[6]:
array([  0,  10,  20,  30,  40,  50,  60,  70,  80,  90, 100])

In [7]:
F = map(fahrenheit, temp)
F


Out[7]:
[32.0, 50.0, 68.0, 86.0, 104.0, 122.0, 140.0, 158.0, 176.0, 194.0, 212.0]

In [8]:
def create_prime(primes, n):
    for p in primes:
        if n % p == 0:
            return primes
    primes.append(n)
    return primes

In [9]:
reduce(create_prime, np.arange(2, 100), [2])


Out[9]:
[2,
 3,
 5,
 7,
 11,
 13,
 17,
 19,
 23,
 29,
 31,
 37,
 41,
 43,
 47,
 53,
 59,
 61,
 67,
 71,
 73,
 79,
 83,
 89,
 97]

Parallel Map

  • map/reduce 연산을 engine process들에게 맏겨서 동시 실행

In [12]:
def pyprimes(kmax):
    p = np.zeros(1000)
    result = []
    if kmax > 1000:
        kmax = 1000
    k = 0
    n = 2
    while k < kmax:
        i = 0
        while i < k and n % p[i] != 0:
            i = i + 1
        if i == k:
            p[k] = n
            k = k + 1
            result.append(n)
        n = n + 1
    return result

In [13]:
%time result = map(pyprimes, range(700, 1000))


CPU times: user 40.5 s, sys: 0 ns, total: 40.5 s
Wall time: 41.2 s

In [14]:
%time parallel_result = dview.map_sync(pyprimes, range(700, 1000))


CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 41.1 s

In [15]:
parallel_result == result


Out[15]:
True

In [16]:
async_result = dview.map_async(pyprimes, range(700, 1000))

In [21]:
async_result.progress


Out[21]:
2

In [24]:
async_result.get()[0][-10:]


Out[24]:
[5189, 5197, 5209, 5227, 5231, 5233, 5237, 5261, 5273, 5279]

모형 저장

모형을 분산처리하기 위해서는 sklearn.externals 서브패키지의 joblib.dump 명령과 joblib.load 명령 사용

ipyparalle 을 사용한 분산 모형 최적화


In [36]:
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline

news = fetch_20newsgroups(subset="all")
n_samples = 3000
X_train = news.data[:n_samples]
y_train = news.target[:n_samples]

model = Pipeline([
        ('vect', TfidfVectorizer(stop_words="english", token_pattern=ur"\b[a-z0-9_\-\.]+[a-z][a-z0-9_\-\.]+\b")),
        ('svc', SVC()),
    ])

In [28]:
from sklearn.externals import joblib
import os
from sklearn.cross_validation import KFold, cross_val_score

def persist_cv_splits(X, y, K=3, name="data", suffix="_cv_%03d.pkl"):
    cv_split_filenames = []
    cv = KFold(n_samples, K, shuffle=True, random_state=0)
    for i, (train, test) in enumerate(cv):
        cv_fold = ([X[k] for k in train], y[train], 
                   [X[k] for k in test], y[test])
        cv_split_filename = name + suffix % i
        cv_split_filename = os.path.abspath(cv_split_filename)
        joblib.dump(cv_fold, cv_split_filename)
        cv_split_filenames.append(cv_split_filename)
    return cv_split_filenames

cv_filenames = persist_cv_splits(X_train, y_train, name="news")
cv_filenames


Out[28]:
['/home/dockeruser/notebooks/ml/news_cv_000.pkl',
 '/home/dockeruser/notebooks/ml/news_cv_001.pkl',
 '/home/dockeruser/notebooks/ml/news_cv_002.pkl']

In [29]:
def compute_evaluation(cv_split_filename, model, params):
    from sklearn.externals import joblib
    X_train_, y_train_, X_test_, y_test_ = joblib.load(cv_split_filename, mmap_mode="c")
    model.set_params(**params)
    model.fit(X_train_, y_train_)
    test_scores = model.score(X_test_, y_test_)
    return test_scores

In [30]:
from sklearn.grid_search import ParameterGrid
def parallel_grid_search(lb_view, model, cv_split_filenames, param_grid):
    all_tasks = []
    all_parameters = list(ParameterGrid(param_grid))
    for i, params in enumerate(all_parameters):
        task_for_params = []
        for j, cv_split_filename in enumerate(cv_split_filenames):
            t = lb_view.apply(compute_evaluation, cv_split_filename, model, params)
            task_for_params.append(t)
        all_tasks.append(task_for_params)
    return all_parameters, all_tasks

In [31]:
import datetime
def print_progress(tasks):
    progress = np.mean([task.ready() for task_group in tasks for task in task_group])
    print("{0}:{1}%".format(datetime.datetime.now(), progress * 100.0))
    return int(progress * 100.0)

In [32]:
from ipyparallel import Client
client = Client()
print(client.ids)
lb_view = client.load_balanced_view()


[0, 1]

In [40]:
from sklearn.grid_search import GridSearchCV
parameters = {
    "svc__gamma": np.logspace(-2, 1, 4),
    "svc__C": np.logspace(-1, 1, 3),
}

In [41]:
all_parameters, all_tasks = parallel_grid_search(lb_view, model, cv_filenames, parameters)

In [ ]:
import time
start_time = datetime.datetime.now()
while True:
    progress = print_progress(all_tasks)
    if progress >= 100:
        break
    time.sleep(1)
print("finish")
end_time = datetime.datetime.now()
print((end_time - start_time).total_seconds())